Просмотр исходного кода

Merge remote-tracking branch 'origin/release-58' into release-584

zmstone 1 год назад
Родитель
Сommit
b356a5b9db

+ 45 - 35
apps/emqx/src/emqx_connection.erl

@@ -101,12 +101,13 @@
     %% GC State
     gc_state :: option(emqx_gc:gc_state()),
     %% Stats Timer
-    stats_timer :: disabled | option(reference()),
-    %% Idle Timeout
-    idle_timeout :: integer() | infinity,
+    %% When `disabled` stats are never reported.
+    %% When `paused` stats are not reported until complete CONNECT packet received.
+    %% Connection starts with `paused` by default.
+    stats_timer :: disabled | paused | option(reference()),
     %% Idle Timer
     idle_timer :: option(reference()),
-    %% Idle Timeout
+    %% Hibernate connection process if inactive for
     hibernate_after :: integer() | infinity,
     %% Zone name
     zone :: atom(),
@@ -341,14 +342,10 @@ init_state(
         end,
     StatsTimer =
         case emqx_config:get_zone_conf(Zone, [stats, enable]) of
-            true -> undefined;
+            true -> paused;
             false -> disabled
         end,
-    IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
 
-    set_tcp_keepalive(Listener),
-
-    IdleTimer = start_timer(IdleTimeout, idle_timeout),
     #state{
         transport = Transport,
         socket = Socket,
@@ -361,9 +358,7 @@ init_state(
         channel = Channel,
         gc_state = GcState,
         stats_timer = StatsTimer,
-        idle_timeout = IdleTimeout,
-        idle_timer = IdleTimer,
-        hibernate_after = maps:get(hibernate_after, Opts, IdleTimeout),
+        hibernate_after = maps:get(hibernate_after, Opts, get_zone_idle_timeout(Zone)),
         zone = Zone,
         listener = Listener,
         limiter_buffer = queue:new(),
@@ -378,18 +373,19 @@ run_loop(
         transport = Transport,
         socket = Socket,
         peername = Peername,
-        channel = Channel
+        listener = Listener,
+        zone = Zone
     }
 ) ->
     emqx_logger:set_metadata_peername(esockd:format(Peername)),
-    ShutdownPolicy = emqx_config:get_zone_conf(
-        emqx_channel:info(zone, Channel),
-        [force_shutdown]
-    ),
+    ShutdownPolicy = emqx_config:get_zone_conf(Zone, [force_shutdown]),
     emqx_utils:tune_heap_size(ShutdownPolicy),
     case activate_socket(State) of
         {ok, NState} ->
-            hibernate(Parent, NState);
+            ok = set_tcp_keepalive(Listener),
+            IdleTimeout = get_zone_idle_timeout(Zone),
+            IdleTimer = start_timer(IdleTimeout, idle_timeout),
+            hibernate(Parent, NState#state{idle_timer = IdleTimer});
         {error, Reason} ->
             ok = Transport:fast_close(Socket),
             exit_on_sock_error(Reason)
@@ -414,7 +410,6 @@ recvloop(
     Parent,
     State = #state{
         hibernate_after = HibernateAfterMs,
-        channel = Channel,
         zone = Zone
     }
 ) ->
@@ -431,9 +426,7 @@ recvloop(
             true ->
                 recvloop(Parent, State);
             false ->
-                ClientId = emqx_channel:info(clientid, Channel),
-                undefined =/= ClientId andalso
-                    emqx_cm:set_chan_stats(ClientId, stats(State)),
+                _ = try_set_chan_stats(State),
                 hibernate(Parent, cancel_stats_timer(State))
         end
     end.
@@ -443,8 +436,8 @@ handle_recv({system, From, Request}, Parent, State) ->
 handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
     %% FIXME: it's not trapping exit, should never receive an EXIT
     terminate(Reason, State);
-handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
-    case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
+handle_recv(Msg, Parent, State) ->
+    case process_msg([Msg], ensure_stats_timer(State)) of
         {ok, NewState} ->
             ?MODULE:recvloop(Parent, NewState);
         {stop, Reason, NewSate} ->
@@ -461,10 +454,18 @@ wakeup_from_hib(Parent, State) ->
 %%--------------------------------------------------------------------
 %% Ensure/cancel stats timer
 
--compile({inline, [ensure_stats_timer/2]}).
-ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
+-compile({inline, [ensure_stats_timer/1]}).
+ensure_stats_timer(State = #state{stats_timer = undefined}) ->
+    Timeout = get_zone_idle_timeout(State#state.zone),
     State#state{stats_timer = start_timer(Timeout, emit_stats)};
-ensure_stats_timer(_Timeout, State) ->
+ensure_stats_timer(State) ->
+    %% Either already active, disabled, or paused.
+    State.
+
+-compile({inline, [resume_stats_timer/1]}).
+resume_stats_timer(State = #state{stats_timer = paused}) ->
+    State#state{stats_timer = undefined};
+resume_stats_timer(State = #state{stats_timer = disabled}) ->
     State.
 
 -compile({inline, [cancel_stats_timer/1]}).
@@ -475,6 +476,10 @@ cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -
 cancel_stats_timer(State) ->
     State.
 
+-compile({inline, [get_zone_idle_timeout/1]}).
+get_zone_idle_timeout(Zone) ->
+    emqx_channel:get_mqtt_conf(Zone, idle_timeout).
+
 %%--------------------------------------------------------------------
 %% Process next Msg
 
@@ -555,9 +560,8 @@ handle_msg(
     State = #state{idle_timer = IdleTimer}
 ) ->
     ok = emqx_utils:cancel_timer(IdleTimer),
-    Serialize = emqx_frame:serialize_opts(ConnPkt),
     NState = State#state{
-        serialize = Serialize,
+        serialize = emqx_frame:serialize_opts(ConnPkt),
         idle_timer = undefined
     },
     handle_incoming(Packet, NState);
@@ -611,7 +615,8 @@ handle_msg(
             maps:get(conn_pid, QSS), {PS, Serialize, Channel}
         ),
     ClientId = emqx_channel:info(clientid, Channel),
-    emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
+    emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
+    {ok, resume_stats_timer(State)};
 handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:set_chan_info(ClientId, info(State)),
@@ -727,9 +732,9 @@ handle_timeout(
         socket = Socket
     }
 ) ->
-    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
     ClientId = emqx_channel:info(clientid, Channel),
     emqx_cm:set_chan_stats(ClientId, stats(State)),
+    emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
     {ok, State#state{stats_timer = undefined}};
 handle_timeout(
     TRef,
@@ -747,6 +752,13 @@ handle_timeout(
 handle_timeout(TRef, Msg, State) ->
     with_channel(handle_timeout, [TRef, Msg], State).
 
+try_set_chan_stats(State = #state{channel = Channel}) ->
+    case emqx_channel:info(clientid, Channel) of
+        %% ClientID is not yet known, nothing to report.
+        undefined -> false;
+        ClientId -> emqx_cm:set_chan_stats(ClientId, stats(State))
+    end.
+
 %%--------------------------------------------------------------------
 %% Parse incoming data
 -compile({inline, [when_bytes_in/3]}).
@@ -1108,10 +1120,8 @@ run_gc(Pubs, Bytes, State = #state{gc_state = GcSt, zone = Zone}) ->
         {_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
     end.
 
-check_oom(Pubs, Bytes, State = #state{channel = Channel}) ->
-    ShutdownPolicy = emqx_config:get_zone_conf(
-        emqx_channel:info(zone, Channel), [force_shutdown]
-    ),
+check_oom(Pubs, Bytes, State = #state{zone = Zone}) ->
+    ShutdownPolicy = emqx_config:get_zone_conf(Zone, [force_shutdown]),
     case emqx_utils:check_oom(ShutdownPolicy) of
         {shutdown, Reason} ->
             %% triggers terminate/2 callback immediately

+ 7 - 7
apps/emqx/test/emqx_connection_SUITE.erl

@@ -188,15 +188,15 @@ t_process_msg(_) ->
     ).
 
 t_ensure_stats_timer(_) ->
-    NStats = emqx_connection:ensure_stats_timer(100, st()),
-    Stats_timer = emqx_connection:info(stats_timer, NStats),
-    ?assert(is_reference(Stats_timer)),
-    ?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
+    NStats = emqx_connection:ensure_stats_timer(st(#{stats_timer => undefined})),
+    StatsTimer = emqx_connection:info(stats_timer, NStats),
+    ?assert(is_reference(StatsTimer)),
+    ?assertEqual(NStats, emqx_connection:ensure_stats_timer(NStats)).
 
 t_cancel_stats_timer(_) ->
     NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
-    Stats_timer = emqx_connection:info(stats_timer, NStats),
-    ?assertEqual(undefined, Stats_timer),
+    StatsTimer = emqx_connection:info(stats_timer, NStats),
+    ?assertEqual(undefined, StatsTimer),
     ?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)).
 
 t_append_msg(_) ->
@@ -272,7 +272,7 @@ t_handle_msg_event(_) ->
     ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
     ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
     ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
-    ?assertEqual(ok, handle_msg({event, connected}, st())),
+    ?assertMatch({ok, _St}, handle_msg({event, connected}, st())),
     ?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
     ?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).
 

+ 52 - 12
apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("common_test/include/ct.hrl").
 
@@ -323,33 +324,76 @@ t_connect_will_retain(Config) ->
     ok = emqtt:disconnect(Client4),
     clean_retained(Topic, Config).
 
-t_connect_idle_timeout(_Config) ->
+t_connect_silent_idle_timeout(init, Config) ->
     IdleTimeout = 2000,
     emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
+    ok = snabbkaffe:start_trace(),
+    [{idle_timeout, IdleTimeout} | Config];
+t_connect_silent_idle_timeout('end', _Config) ->
+    snabbkaffe:stop(),
+    emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
+    ok.
+
+t_connect_silent_idle_timeout(Config) ->
+    %% Connect, send nothing more.
+    %% Connection should be dropped in roughly `IdleTimeout` ms.
+    IdleTimeout = ?config(idle_timeout, Config),
+    emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
+    SockOpts = [binary, {active, true}, {nodelay, true}],
+    {ok, Sock} = gen_tcp:connect({127, 0, 0, 1}, 1883, SockOpts, 5000),
+    ?assertReceive({tcp_closed, Sock}, IdleTimeout * 2),
+    ?assertMatch(
+        {ok, #{reason := {shutdown, idle_timeout}}},
+        ?block_until(#{?snk_kind := terminate}, IdleTimeout)
+    ).
+
+t_connect_idle_timeout(init, Config) ->
+    IdleTimeout = 2000,
+    emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
+    ok = snabbkaffe:start_trace(),
+    [{idle_timeout, IdleTimeout} | Config];
+t_connect_idle_timeout('end', _Config) ->
+    snabbkaffe:stop(),
+    emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
+    ok.
+
+t_connect_idle_timeout(Config) ->
+    %% Connect, send few bytes.
+    %% Connection should be dropped in roughly `IdleTimeout` ms.
+    IdleTimeout = ?config(idle_timeout, Config),
+    ConnectPacket = emqx_frame:serialize(?CONNECT_PACKET(#mqtt_packet_connect{})),
     emqx_config:put_zone_conf(default, [mqtt, idle_timeout], IdleTimeout),
-    {ok, Sock} = emqtt_sock:connect({127, 0, 0, 1}, 1883, [], 60000),
-    timer:sleep(IdleTimeout),
-    ?assertMatch({error, closed}, emqtt_sock:recv(Sock, 1024)).
+    SockOpts = [binary, {active, true}, {nodelay, true}],
+    {ok, Sock} = gen_tcp:connect({127, 0, 0, 1}, 1883, SockOpts, 5000),
+    ok = gen_tcp:send(Sock, binary:part(iolist_to_binary(ConnectPacket), 0, 4)),
+    ?assertReceive({tcp_closed, Sock}, IdleTimeout * 2),
+    ?assertMatch(
+        {ok, #{reason := {shutdown, idle_timeout}}},
+        ?block_until(#{?snk_kind := terminate}, IdleTimeout)
+    ).
 
 t_connect_emit_stats_timeout(init, Config) ->
     NewIdleTimeout = 1000,
     emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout),
-    emqx_config:put_zone_conf(default, [mqtt, idle_timeout], NewIdleTimeout),
     ok = snabbkaffe:start_trace(),
     [{idle_timeout, NewIdleTimeout} | Config];
 t_connect_emit_stats_timeout('end', _Config) ->
     snabbkaffe:stop(),
     emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
-    emqx_config:put_zone_conf(default, [mqtt, idle_timeout], 15000),
     ok.
 
 t_connect_emit_stats_timeout(Config) ->
     ConnFun = ?config(conn_fun, Config),
-    {_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config),
+    IdleTimeout = ?config(idle_timeout, Config),
     {ok, Client} = emqtt:start_link([{proto_ver, v5}, {keepalive, 60} | Config]),
     {ok, _} = emqtt:ConnFun(Client),
+    %% Poke the connection to ensure stats timer is armed.
+    pong = emqtt:ping(Client),
     [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
-    ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
+    ?assertMatch(
+        TRef when is_reference(TRef),
+        emqx_connection:info(stats_timer, sys:get_state(ClientPid))
+    ),
     ?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
     ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
     ok = emqtt:disconnect(Client).
@@ -490,7 +534,6 @@ t_connack_session_present(Config) ->
 t_connack_max_qos_allowed(init, Config) ->
     Config;
 t_connack_max_qos_allowed('end', _Config) ->
-    emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
     emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
     ok.
 t_connack_max_qos_allowed(Config) ->
@@ -500,7 +543,6 @@ t_connack_max_qos_allowed(Config) ->
 
     %% max_qos_allowed = 0
     emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0),
-    emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 0),
 
     {ok, Client1} = emqtt:start_link([{proto_ver, v5} | Config]),
     {ok, Connack1} = emqtt:ConnFun(Client1),
@@ -537,7 +579,6 @@ t_connack_max_qos_allowed(Config) ->
 
     %% max_qos_allowed = 1
     emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1),
-    emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 1),
 
     {ok, Client3} = emqtt:start_link([{proto_ver, v5} | Config]),
     {ok, Connack3} = emqtt:ConnFun(Client3),
@@ -574,7 +615,6 @@ t_connack_max_qos_allowed(Config) ->
 
     %% max_qos_allowed = 2
     emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
-    emqx_config:put_zone_conf(default, [mqtt, max_qos_allowed], 2),
 
     {ok, Client5} = emqtt:start_link([{proto_ver, v5} | Config]),
     {ok, Connack5} = emqtt:ConnFun(Client5),

+ 1 - 1
apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl

@@ -238,7 +238,7 @@ handle_in(Frame = ?MSG(MType), Channel) when
     ?SLOG(debug, #{msg => "recv_frame", frame => Frame, info => "jt808_client_deregister"}),
     do_handle_in(Frame, Channel#channel{conn_state = disconnected});
 handle_in(Frame, Channel) ->
-    ?SLOG(error, #{msg => "unexpected_lwm2m_frame", frame => Frame}),
+    ?SLOG(error, #{msg => "unexpected_jt808_frame", frame => Frame}),
     {shutdown, unexpected_frame, Channel}.
 
 handle_frame_error(Reason, Channel) ->

+ 1 - 0
changes/ce/fix-14260.en.md

@@ -0,0 +1 @@
+Fix rare race condition that might have caused connection process to crash when CONNECT packet is not fully received until idle timeout (15 seconds by default) passes.

+ 1 - 1
mix.exs

@@ -353,11 +353,11 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_bridge_cassandra,
       :emqx_bridge_opents,
       :emqx_bridge_dynamo,
+      :emqx_bridge_es,
       :emqx_bridge_greptimedb,
       :emqx_bridge_hstreamdb,
       :emqx_bridge_influxdb,
       :emqx_bridge_iotdb,
-      :emqx_bridge_es,
       :emqx_bridge_matrix,
       :emqx_bridge_mongodb,
       :emqx_bridge_mysql,

+ 1 - 0
rebar.config.erl

@@ -84,6 +84,7 @@ is_community_umbrella_app("apps/emqx_bridge_cassandra") -> false;
 is_community_umbrella_app("apps/emqx_bridge_opents") -> false;
 is_community_umbrella_app("apps/emqx_bridge_clickhouse") -> false;
 is_community_umbrella_app("apps/emqx_bridge_dynamo") -> false;
+is_community_umbrella_app("apps/emqx_bridge_es") -> false;
 is_community_umbrella_app("apps/emqx_bridge_greptimedb") -> false;
 is_community_umbrella_app("apps/emqx_bridge_hstreamdb") -> false;
 is_community_umbrella_app("apps/emqx_bridge_influxdb") -> false;